feat: Pub/sub#244
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new Google Cloud Pub/Sub client library, including core client functionality, topic and subscription management, and support for the Pub/Sub emulator. The review identified several critical issues: a race condition and token refresh problem in the authentication logic, a resource leak where the auth client is not closed, an overly restrictive project ID requirement for the emulator, an incorrect parsing logic for IPv6 emulator hosts, and code duplication in message mapping. I have recommended addressing these issues to improve the reliability and maintainability of the client.
| _authClient ??= await auth.clientViaApplicationDefaultCredentials( | ||
| scopes: ['https://www.googleapis.com/auth/pubsub'], | ||
| ); | ||
| final token = _authClient!.credentials.accessToken.data; | ||
| return CallOptions(metadata: {'authorization': 'Bearer $token'}); |
There was a problem hiding this comment.
There are several issues with the current authentication logic:
- Race Condition: Multiple concurrent RPCs could trigger multiple calls to clientViaApplicationDefaultCredentials before _authClient is set.
- Token Refresh: AutoRefreshingAuthClient only refreshes tokens when used via its HTTP interface. Since this code manually extracts the token string for gRPC, the token will eventually expire and never be refreshed. You should call refreshCredentials() to ensure a valid token.
- Resource Leak: The _authClient is never closed. It should be closed in the close() method of the PubSub class.
| if (host.contains(':')) { | ||
| final parts = host.split(':'); | ||
| cleanHost = parts[0]; | ||
| port = int.parse(parts[1]); | ||
| } |
There was a problem hiding this comment.
The current logic for splitting the host and port fails for IPv6 addresses (e.g., [::1]:8085). Using lastIndexOf(':') and checking for the closing bracket ] is a more robust approach for parsing the port.
| if (host.contains(':')) { | |
| final parts = host.split(':'); | |
| cleanHost = parts[0]; | |
| port = int.parse(parts[1]); | |
| } | |
| final lastColon = host.lastIndexOf(':'); | |
| if (lastColon != -1 && !host.endsWith(']')) { | |
| cleanHost = host.substring(0, lastColon); | |
| port = int.parse(host.substring(lastColon + 1)); | |
| } |
| Future<void> close() async { | ||
| await _channel.shutdown(); | ||
| } |
There was a problem hiding this comment.
| return response.receivedMessages | ||
| .map( | ||
| (m) => ReceivedMessage( | ||
| ackId: m.ackId, | ||
| message: Message( | ||
| data: m.message.data, | ||
| attributes: m.message.attributes, | ||
| messageId: m.message.messageId, | ||
| publishTime: m.message.publishTime.toDateTime(), | ||
| ), | ||
| ), | ||
| ) | ||
| .toList(); |
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Initial implementation. Only basic functionality implemented.
Streaming pull is implemented.
No built-in retry yet.
Tools to fetch and compile protos.